# 4.3 实战：接入数据库和 API

**第4周 | 第3课 | 连接真实数据源 | 预计时长：45分钟**

---

## 学习目标

完成本课后，你将能够：

- 创建 SQLite 数据库并编写 MCP 工具进行查询
- 实现 MCP 工具调用外部 HTTP API
- 在 MCP Server 中处理错误、分页、超时
- 构建一个能同时查数据库和调 API 的 Agent

---

## 1. 环境准备

```bash
mkdir mcp-data-api && cd mcp-data-api
python -m venv .venv

# Windows
.venv\Scripts\activate
# macOS/Linux
# source .venv/bin/activate

# 安装依赖
pip install mcp httpx aiosqlite
```

```txt
# requirements.txt
mcp>=1.0.0
httpx>=0.27.0        # 异步 HTTP 客户端
aiosqlite>=0.20.0    # 异步 SQLite
```

---

## 2. Part 1：接入 SQLite 数据库

### 2.1 创建测试数据库

```python
# setup_db.py
"""
初始化 SQLite 数据库：创建用户、订单、商品表，并插入测试数据
"""
import sqlite3
import os

DB_PATH = "trade.db"


def setup_database():
    """创建数据库并填充测试数据"""
    # 如果数据库已存在则删除（每次重新创建）
    if os.path.exists(DB_PATH):
        os.remove(DB_PATH)

    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    # --- 创建表 ---

    # 用户表
    cursor.execute("""
        CREATE TABLE users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE,
            phone TEXT,
            balance REAL DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    # 商品表
    cursor.execute("""
        CREATE TABLE products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            category TEXT,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0,
            description TEXT
        )
    """)

    # 订单表
    cursor.execute("""
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            product_id INTEGER NOT NULL,
            quantity INTEGER NOT NULL,
            total_amount REAL NOT NULL,
            status TEXT DEFAULT 'pending',
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (user_id) REFERENCES users(id),
            FOREIGN KEY (product_id) REFERENCES products(id)
        )
    """)

    # --- 插入测试数据 ---

    # 用户数据
    users = [
        ("张三", "zhangsan@example.com", "13800001001", 5000.00),
        ("李四", "lisi@example.com", "13800001002", 8000.00),
        ("王五", "wangwu@example.com", "13800001003", 3000.00),
        ("赵六", "zhaoliu@example.com", "13800001004", 12000.00),
        ("孙七", "sunqi@example.com", "13800001005", 6500.00),
    ]
    cursor.executemany(
        "INSERT INTO users (name, email, phone, balance) VALUES (?, ?, ?, ?)",
        users,
    )

    # 商品数据
    products = [
        ("iPhone 15", "手机", 5999.00, 50, "Apple iPhone 15 128GB"),
        ("MacBook Air", "电脑", 7999.00, 30, "Apple MacBook Air M2 256GB"),
        ("AirPods Pro", "耳机", 1799.00, 100, "Apple AirPods Pro 2代"),
        ("iPad mini", "平板", 3999.00, 40, "Apple iPad mini 6 64GB"),
        ("Apple Watch", "手表", 2999.00, 60, "Apple Watch Series 9"),
        ("华为 Mate 60", "手机", 5499.00, 45, "华为 Mate 60 Pro 256GB"),
        ("小米 14", "手机", 3999.00, 80, "小米 14 Pro 256GB"),
    ]
    cursor.executemany(
        "INSERT INTO products (name, category, price, stock, description) "
        "VALUES (?, ?, ?, ?, ?)",
        products,
    )

    # 订单数据
    orders = [
        (1, 1, 1, 5999.00, "completed"),
        (1, 3, 2, 3598.00, "completed"),
        (2, 2, 1, 7999.00, "completed"),
        (3, 6, 1, 5499.00, "pending"),
        (4, 4, 1, 3999.00, "shipped"),
        (4, 5, 1, 2999.00, "completed"),
        (5, 7, 2, 7998.00, "pending"),
    ]
    cursor.executemany(
        "INSERT INTO orders (user_id, product_id, quantity, total_amount, status) "
        "VALUES (?, ?, ?, ?, ?)",
        orders,
    )

    conn.commit()
    conn.close()
    print(f"✅ 数据库创建完成：{DB_PATH}")
    print(f"  用户数：{len(users)}")
    print(f"  商品数：{len(products)}")
    print(f"  订单数：{len(orders)}")


if __name__ == "__main__":
    setup_database()
```

```bash
python setup_db.py
```

### 2.2 数据库 MCP Server

```python
# db_server.py
"""
数据库 MCP Server：提供用户、订单、商品查询工具
"""
import sqlite3
from mcp.server.fastmcp import FastMCP

DB_PATH = "trade.db"

mcp = FastMCP(
    name="database-server",
    description="数据库查询工具：用户、订单、商品",
)


def _get_db():
    """获取数据库连接（每次查询新建连接，保证线程安全）"""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row  # 允许通过列名访问
    return conn


# ==================== 用户相关工具 ====================

@mcp.tool()
def list_users(page: int = 1, page_size: int = 10) -> str:
    """
    列出所有用户（支持分页）。

    Args:
        page: 页码，从 1 开始
        page_size: 每页数量，默认 10

    Returns:
        格式化的用户列表
    """
    offset = (page - 1) * page_size

    conn = _get_db()
    cursor = conn.cursor()

    # 查询总数
    cursor.execute("SELECT COUNT(*) FROM users")
    total = cursor.fetchone()[0]

    # 查询当前页数据
    cursor.execute(
        "SELECT id, name, email, phone, balance, created_at "
        "FROM users ORDER BY id LIMIT ? OFFSET ?",
        (page_size, offset),
    )
    rows = cursor.fetchall()
    conn.close()

    # 格式化输出
    lines = [f"用户列表（第 {page} 页，共 {total} 条）："]
    lines.append("-" * 80)
    for row in rows:
        lines.append(
            f"  ID: {row['id']} | {row['name']} | {row['email']} | "
            f"余额: ¥{row['balance']:.2f} | 注册: {row['created_at']}"
        )
    total_pages = (total + page_size - 1) // page_size
    lines.append(f"\n共 {total_pages} 页，当前第 {page}/{total_pages} 页")

    return "\n".join(lines)


@mcp.tool()
def get_user(user_id: int) -> str:
    """
    查询指定用户的详细信息。

    Args:
        user_id: 用户 ID

    Returns:
        用户详细信息
    """
    conn = _get_db()
    cursor = conn.cursor()
    cursor.execute(
        "SELECT id, name, email, phone, balance, created_at FROM users WHERE id = ?",
        (user_id,),
    )
    row = cursor.fetchone()
    conn.close()

    if not row:
        return f"错误：未找到 ID 为 {user_id} 的用户"

    return (
        f"用户详情：\n"
        f"  ID: {row['id']}\n"
        f"  姓名: {row['name']}\n"
        f"  邮箱: {row['email']}\n"
        f"  手机: {row['phone']}\n"
        f"  余额: ¥{row['balance']:.2f}\n"
        f"  注册时间: {row['created_at']}"
    )


# ==================== 订单相关工具 ====================

@mcp.tool()
def get_user_orders(user_id: int) -> str:
    """
    查询指定用户的所有订单。

    Args:
        user_id: 用户 ID

    Returns:
        用户的订单列表
    """
    conn = _get_db()
    cursor = conn.cursor()

    # 先验证用户是否存在
    cursor.execute("SELECT name FROM users WHERE id = ?", (user_id,))
    user = cursor.fetchone()
    if not user:
        conn.close()
        return f"错误：未找到 ID 为 {user_id} 的用户"

    # 查询订单（关联商品表获取商品名称）
    cursor.execute("""
        SELECT o.id, o.quantity, o.total_amount, o.status, o.created_at,
               p.name as product_name
        FROM orders o
        JOIN products p ON o.product_id = p.id
        WHERE o.user_id = ?
        ORDER BY o.created_at DESC
    """, (user_id,))
    rows = cursor.fetchall()
    conn.close()

    if not rows:
        return f"用户 {user['name']}（ID: {user_id}）暂无订单记录"

    status_cn = {
        "pending": "待付款",
        "paid": "已付款",
        "shipped": "已发货",
        "completed": "已完成",
        "cancelled": "已取消",
    }

    lines = [f"{user['name']} 的订单（共 {len(rows)} 笔）："]
    lines.append("-" * 80)
    for row in rows:
        status_text = status_cn.get(row["status"], row["status"])
        lines.append(
            f"  订单#{row['id']} | {row['product_name']} × {row['quantity']} | "
            f"¥{row['total_amount']:.2f} | {status_text} | {row['created_at']}"
        )

    total = sum(row["total_amount"] for row in rows)
    lines.append(f"\n累计消费：¥{total:.2f}")

    return "\n".join(lines)


@mcp.tool()
def get_order(order_id: int) -> str:
    """
    查询指定订单的详细信息。

    Args:
        order_id: 订单 ID

    Returns:
        订单详细信息
    """
    conn = _get_db()
    cursor = conn.cursor()
    cursor.execute("""
        SELECT o.id, o.quantity, o.total_amount, o.status, o.created_at,
               p.name as product_name, p.price as unit_price,
               u.name as user_name
        FROM orders o
        JOIN products p ON o.product_id = p.id
        JOIN users u ON o.user_id = u.id
        WHERE o.id = ?
    """, (order_id,))
    row = cursor.fetchone()
    conn.close()

    if not row:
        return f"错误：未找到 ID 为 {order_id} 的订单"

    status_cn = {
        "pending": "待付款",
        "paid": "已付款",
        "shipped": "已发货",
        "completed": "已完成",
        "cancelled": "已取消",
    }

    return (
        f"订单详情 #{row['id']}：\n"
        f"  用户：{row['user_name']}\n"
        f"  商品：{row['product_name']}\n"
        f"  单价：¥{row['unit_price']:.2f}\n"
        f"  数量：{row['quantity']}\n"
        f"  总价：¥{row['total_amount']:.2f}\n"
        f"  状态：{status_cn.get(row['status'], row['status'])}\n"
        f"  下单时间：{row['created_at']}"
    )


# ==================== 商品相关工具 ====================

@mcp.tool()
def search_products(keyword: str = "", category: str = "") -> str:
    """
    搜索商品（支持关键词和分类筛选）。

    Args:
        keyword: 搜索关键词（匹配商品名称和描述）
        category: 商品分类筛选

    Returns:
        匹配的商品列表
    """
    conn = _get_db()
    cursor = conn.cursor()

    # 构建动态查询
    conditions = []
    params = []

    if keyword:
        conditions.append("(name LIKE ? OR description LIKE ?)")
        params.extend([f"%{keyword}%", f"%{keyword}%"])

    if category:
        conditions.append("category = ?")
        params.append(category)

    where_clause = " AND ".join(conditions) if conditions else "1=1"

    cursor.execute(
        f"SELECT id, name, category, price, stock, description "
        f"FROM products WHERE {where_clause} ORDER BY id",
        params,
    )
    rows = cursor.fetchall()
    conn.close()

    if not rows:
        return "未找到匹配的商品"

    lines = [f"商品列表（共 {len(rows)} 件）："]
    lines.append("-" * 80)
    for row in rows:
        stock_status = "✅ 有货" if row["stock"] > 0 else "❌ 缺货"
        lines.append(
            f"  ID: {row['id']} | {row['name']} | {row['category']} | "
            f"¥{row['price']:.2f} | 库存: {row['stock']} {stock_status}"
        )
        if row["description"]:
            lines.append(f"    描述: {row['description']}")

    return "\n".join(lines)


if __name__ == "__main__":
    mcp.run()
```

### 2.3 数据库 Client 测试

```python
# db_client.py
"""
测试数据库 MCP Server 的所有工具
"""
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


async def main():
    params = StdioServerParameters(command="python", args=["db_server.py"])

    async with stdio_client(params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # 1. 列出所有用户
            print("=" * 60)
            print("📋 列出用户（第1页）")
            print("=" * 60)
            r = await session.call_tool("list_users", {"page": 1, "page_size": 3})
            print(r.content[0].text)
            print()

            # 2. 查看用户详情
            print("=" * 60)
            print("👤 用户详情：张三（ID=1）")
            print("=" * 60)
            r = await session.call_tool("get_user", {"user_id": 1})
            print(r.content[0].text)
            print()

            # 3. 查看用户订单
            print("=" * 60)
            print("📦 张三的订单历史")
            print("=" * 60)
            r = await session.call_tool("get_user_orders", {"user_id": 1})
            print(r.content[0].text)
            print()

            # 4. 查看订单详情
            print("=" * 60)
            print("📄 订单详情 #3")
            print("=" * 60)
            r = await session.call_tool("get_order", {"order_id": 3})
            print(r.content[0].text)
            print()

            # 5. 搜索商品
            print("=" * 60)
            print("🔍 搜索商品：关键词 '手机'")
            print("=" * 60)
            r = await session.call_tool("search_products", {"keyword": "手机"})
            print(r.content[0].text)
            print()

            # 6. 按分类筛选
            print("=" * 60)
            print("🔍 搜索商品：分类 '电脑'")
            print("=" * 60)
            r = await session.call_tool("search_products", {"category": "电脑"})
            print(r.content[0].text)
            print()


if __name__ == "__main__":
    asyncio.run(main())
```

---

## 3. Part 2：接入外部 HTTP API

### 3.1 API MCP Server

我们将接入一个公开的免费天气 API（使用 Open-Meteo，无需 API Key）：

```python
# api_server.py
"""
API MCP Server：调用外部 HTTP API
示例：实时天气查询（使用 Open-Meteo 免费 API）
"""
import asyncio
import httpx
from mcp.server.fastmcp import FastMCP

mcp = FastMCP(
    name="api-server",
    description="外部 API 工具：天气查询、新闻获取",
)

# Open-Meteo 免费天气 API，无需 API Key
WEATHER_API = "https://api.open-meteo.com/v1/forecast"

# 城市坐标
CITY_COORDS = {
    "北京": {"lat": 39.9042, "lon": 116.4074},
    "上海": {"lat": 31.2304, "lon": 121.4737},
    "广州": {"lat": 23.1291, "lon": 113.2644},
    "深圳": {"lat": 22.5431, "lon": 114.0579},
    "成都": {"lat": 30.5728, "lon": 104.0668},
    "杭州": {"lat": 30.2741, "lon": 120.1551},
    "武汉": {"lat": 30.5928, "lon": 114.3055},
    "西安": {"lat": 34.3416, "lon": 108.9398},
    "东京": {"lat": 35.6762, "lon": 139.6503},
    "纽约": {"lat": 40.7128, "lon": -74.0060},
}

# 天气代码中文映射
WEATHER_CODES = {
    0: "晴", 1: "大部分晴", 2: "多云", 3: "阴",
    45: "雾", 48: "冻雾",
    51: "小毛毛雨", 53: "中毛毛雨", 55: "大毛毛雨",
    61: "小雨", 63: "中雨", 65: "大雨",
    71: "小雪", 73: "中雪", 75: "大雪",
    80: "小阵雨", 81: "中阵雨", 82: "大阵雨",
    95: "雷暴", 96: "雷暴伴冰雹", 99: "强雷暴伴冰雹",
}


@mcp.tool()
async def get_realtime_weather(city: str) -> str:
    """
    获取指定城市的实时天气（调用 Open-Meteo API）。

    Args:
        city: 城市名称

    Returns:
        实时天气信息
    """
    if city not in CITY_COORDS:
        return (
            f"错误：不支持的城市 '{city}'。\n"
            f"支持的城市：{', '.join(CITY_COORDS.keys())}"
        )

    coords = CITY_COORDS[city]
    params = {
        "latitude": coords["lat"],
        "longitude": coords["lon"],
        "current": "temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m",
        "timezone": "auto",
    }

    try:
        # 异步 HTTP 请求
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(WEATHER_API, params=params)

            # 检查 HTTP 状态码
            if response.status_code != 200:
                return f"错误：API 返回状态码 {response.status_code}"

            data = response.json()

            # 解析响应数据
            current = data.get("current", {})
            weather_code = current.get("weather_code", -1)
            weather_desc = WEATHER_CODES.get(weather_code, f"未知({weather_code})")

            return (
                f"{city} 实时天气：\n"
                f"  天气：{weather_desc}\n"
                f"  温度：{current.get('temperature_2m', 'N/A')}°C\n"
                f"  湿度：{current.get('relative_humidity_2m', 'N/A')}%\n"
                f"  风速：{current.get('wind_speed_10m', 'N/A')} km/h\n"
                f"  数据来源：Open-Meteo"
            )

    except httpx.TimeoutException:
        return f"错误：请求超时，请稍后重试"
    except httpx.ConnectError:
        return f"错误：无法连接到天气服务"
    except Exception as e:
        return f"错误：获取天气信息失败 — {str(e)}"


@mcp.tool()
async def get_weather_forecast(city: str, days: int = 3) -> str:
    """
    获取指定城市的天气预报（未来几天）。

    Args:
        city: 城市名称
        days: 预报天数（1-7 天）

    Returns:
        天气预报信息
    """
    if city not in CITY_COORDS:
        return f"错误：不支持的城市 '{city}'"

    if days < 1 or days > 7:
        return "错误：预报天数必须在 1-7 天之间"

    coords = CITY_COORDS[city]
    params = {
        "latitude": coords["lat"],
        "longitude": coords["lon"],
        "daily": "weather_code,temperature_2m_max,temperature_2m_min",
        "timezone": "auto",
        "forecast_days": days,
    }

    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(WEATHER_API, params=params)
            response.raise_for_status()
            data = response.json()

            daily = data.get("daily", {})
            dates = daily.get("time", [])
            codes = daily.get("weather_code", [])
            temps_max = daily.get("temperature_2m_max", [])
            temps_min = daily.get("temperature_2m_min", [])

            lines = [f"{city} 天气预报（未来 {days} 天）："]
            lines.append("-" * 50)

            for i in range(len(dates)):
                weather_desc = WEATHER_CODES.get(codes[i], "未知")
                lines.append(
                    f"  {dates[i]} | {weather_desc} | "
                    f"{temps_min[i]}°C ~ {temps_max[i]}°C"
                )

            return "\n".join(lines)

    except httpx.HTTPStatusError as e:
        return f"错误：API 请求失败 — {e.response.status_code}"
    except Exception as e:
        return f"错误：获取天气预报失败 — {str(e)}"


if __name__ == "__main__":
    mcp.run()
```

### 3.2 API Client 测试

```python
# api_client.py
"""
测试 API MCP Server
"""
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


async def main():
    params = StdioServerParameters(command="python", args=["api_server.py"])

    async with stdio_client(params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # 1. 实时天气
            print("=" * 60)
            print("🌤  实时天气：北京")
            print("=" * 60)
            r = await session.call_tool("get_realtime_weather", {"city": "北京"})
            print(r.content[0].text)
            print()

            # 2. 天气预报
            print("=" * 60)
            print("📅 天气预报：上海（3天）")
            print("=" * 60)
            r = await session.call_tool("get_weather_forecast", {"city": "上海", "days": 3})
            print(r.content[0].text)
            print()

            # 3. 错误处理测试 — 不支持的城市
            print("=" * 60)
            print("❌ 测试：不支持的城市 '巴黎'")
            print("=" * 60)
            r = await session.call_tool("get_realtime_weather", {"city": "巴黎"})
            print(r.content[0].text)
            print()


if __name__ == "__main__":
    asyncio.run(main())
```

---

## 4. Part 3：整合 Server — 数据库 + API

实际项目中，Agent 往往需要同时使用数据库和外部 API。有两种方式实现：

### 方案 A：合并到一个 Server

```python
# unified_server.py
"""
统一的 MCP Server：同时提供数据库查询和 API 调用
"""
import sqlite3
import httpx
from mcp.server.fastmcp import FastMCP

DB_PATH = "trade.db"
mcp = FastMCP("unified-server")


# === 数据库工具 ===

def _get_db():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


@mcp.tool()
def list_users(page: int = 1) -> str:
    """列出用户"""
    conn = _get_db()
    cursor = conn.cursor()
    cursor.execute(
        "SELECT id, name, email, balance FROM users "
        f"ORDER BY id LIMIT 10 OFFSET {(page - 1) * 10}"
    )
    rows = cursor.fetchall()
    conn.close()
    return "\n".join(
        f"  {r['id']}. {r['name']} | {r['email']} | ¥{r['balance']:.2f}"
        for r in rows
    )


@mcp.tool()
def search_products(keyword: str = "") -> str:
    """搜索商品"""
    conn = _get_db()
    cursor = conn.cursor()
    if keyword:
        cursor.execute(
            "SELECT id, name, price, stock FROM products "
            "WHERE name LIKE ? ORDER BY id",
            (f"%{keyword}%",),
        )
    else:
        cursor.execute("SELECT id, name, price, stock FROM products ORDER BY id")
    rows = cursor.fetchall()
    conn.close()
    return "\n".join(
        f"  {r['id']}. {r['name']} | ¥{r['price']:.2f} | 库存:{r['stock']}"
        for r in rows
    )


# === API 工具 ===

CITY_COORDS = {
    "北京": (39.9042, 116.4074),
    "上海": (31.2304, 121.4737),
    "广州": (23.1291, 113.2644),
}

WEATHER_CODES = {0: "晴", 1: "大部晴", 2: "多云", 3: "阴", 61: "小雨", 63: "中雨", 95: "雷暴"}


@mcp.tool()
async def get_weather(city: str) -> str:
    """获取实时天气"""
    if city not in CITY_COORDS:
        return f"不支持的城市：{city}"

    lat, lon = CITY_COORDS[city]
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            resp = await client.get(
                "https://api.open-meteo.com/v1/forecast",
                params={
                    "latitude": lat, "longitude": lon,
                    "current": "temperature_2m,weather_code",
                    "timezone": "auto",
                },
            )
            data = resp.json()
            current = data["current"]
            code = current.get("weather_code", -1)
            return f"{city}：{WEATHER_CODES.get(code, '未知')}，{current.get('temperature_2m')}°C"
    except Exception as e:
        return f"天气查询失败：{e}"


if __name__ == "__main__":
    mcp.run()
```

### 方案 B：独立 Server，Client 连接多个

见下一课 4.4 的详细讲解。

---

## 5. 错误处理与最佳实践

### 5.1 工具内部错误处理模式

```python
@mcp.tool()
async def safe_api_call(endpoint: str) -> str:
    """安全的 API 调用模板"""

    # 1. 输入校验
    if not endpoint:
        return "错误：endpoint 不能为空"

    # 2. 超时控制
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            # 3. 发起请求
            response = await client.get(endpoint)

            # 4. 检查状态码
            if response.status_code == 404:
                return "错误：请求的资源不存在"
            if response.status_code == 429:
                return "错误：请求过于频繁，请稍后重试（速率限制）"
            if response.status_code != 200:
                return f"错误：服务异常（HTTP {response.status_code}）"

            # 5. 解析响应
            data = response.json()

            # 6. 数据验证
            if not data:
                return "错误：返回的数据为空"

            return format_result(data)

    except httpx.TimeoutException:
        return "错误：请求超时"
    except httpx.ConnectError:
        return "错误：无法连接到服务"
    except ValueError:
        return "错误：响应格式不正确（非 JSON）"
    except Exception as e:
        return f"错误：未知异常 — {type(e).__name__}: {e}"
```

### 5.2 分页处理

```python
@mcp.tool()
def list_orders(user_id: int, page: int = 1, page_size: int = 10) -> str:
    """
    列出用户订单（分页）。

    分页三要素：page（页码）、page_size（每页数量）、返回总数信息
    """
    offset = (page - 1) * page_size

    conn = _get_db()
    cursor = conn.cursor()

    # 总记录数
    cursor.execute("SELECT COUNT(*) FROM orders WHERE user_id = ?", (user_id,))
    total = cursor.fetchone()[0]

    # 当前页数据
    cursor.execute(
        "SELECT id, total_amount, status, created_at FROM orders "
        "WHERE user_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ?",
        (user_id, page_size, offset),
    )
    rows = cursor.fetchall()
    conn.close()

    total_pages = (total + page_size - 1) // page_size

    lines = [
        f"订单列表（第 {page} 页，共 {total} 条，{total_pages} 页）：",
    ]
    # ... 格式化输出
    return "\n".join(lines)
```

---

## 动手练习

### 练习 1：扩展数据库工具

在 `db_server.py` 中添加以下工具：

1. `get_user_stats()` — 统计信息：总用户数、总订单数、总销售额
2. `get_top_products(limit: int = 5)` — 销量最高的商品排名
3. `get_category_stats()` — 各分类的销售统计

测试工具是否正常工作。

### 练习 2：添加新闻 API 工具

使用免费的新闻 API（或任意公开 API），添加一个 `get_news(category: str, count: int = 5)` 工具。注意处理：
- API 超时
- 无效响应
- 空结果

### 练习 3：整合测试

编写一个 Client 脚本，模拟以下场景：
1. 用户查询张三的账户信息
2. 查看张三的订单历史
3. 查询张三所在城市的天气
4. 根据订单中的商品分类搜索同类商品

展示 Agent 如何在一次对话流程中组合使用数据库工具和 API 工具。

---

## 本课总结

- **数据库接入**：用 `sqlite3` + MCP tool，每个工具封装一个查询场景
- **API 接入**：用 `httpx.AsyncClient` 做异步 HTTP 请求，注意超时和错误处理
- **错误处理**：工具内部捕获异常，返回友好的错误消息（而非抛出）
- **分页**：返回数据的同时提供总记录数和总页数
- **整合方式**：多个工具可以放在一个 Server，也可以拆分为多个 Server
- **异步工具**：用 `async def` 定义工具，支持 `await` 异步操作

---

## 下一课预告

下一课我们将探索**多 MCP Server 协同**：同时运行数据库 Server、API Server 和文件 Server，让一个 Agent 在多个 Server 之间自由调用工具，完成更复杂的任务。
